contents
Kafka Connect는 아파치 카프카와 다른 시스템 간에 데이터를 안정적으로 스트리밍하기 위한 프레임워크입니다. 이는 카프카에 포함된 도구로, 커스텀 코드를 작성하지 않고도 대규모 데이터셋을 카프카 안팎으로 이동시키는 간단하고, 확장 가능하며, 내결함성 있는 방법을 제공합니다.
Kafka Connect를 데이터를 위한 강력한 만능 어댑터 🔌 세트라고 생각할 수 있습니다. 카프카에 연결하려는 모든 데이터베이스나 애플리케이션에 대해 매번 커스텀 커넥터를 만드는 대신, Kafka Connect의 재사용 가능한 구성 요소를 사용합니다.
Kafka Connect가 해결하는 문제
Kafka Connect 이전에는 PostgreSQL 데이터베이스의 데이터를 카프카 토픽으로 가져오려면 커스텀 애플리케이션(카프카 프로듀서)을 작성해야 했습니다. 그런 다음 그 토픽의 데이터를 Elasticsearch로 옮기려면 또 다른 커스텀 애플리케이션(카프카 컨슈머)을 작성해야 했습니다.
이 접근 방식에는 몇 가지 문제가 있습니다.
- 상용구 코드: 모든 커스텀 커넥터는 오류 처리, 재시도, 데이터 직렬화, 오프셋 관리를 직접 처리해야 합니다. 이는 반복적이고 복잡합니다.
- 확장성 부족: 커스텀 앱의 여러 인스턴스를 어떻게 병렬로 실행할까요? 한 인스턴스가 실패하면 작업을 어떻게 재조정할까요?
- 중앙 관리 부재: 각 커스텀 커넥터는 독립적으로 배포, 모니터링, 설정해야 하는 별개의 애플리케이션입니다.
Kafka Connect는 이 모든 어려운 부분을 처리하는 표준화된 프레임워크를 제공하여, 코드가 아닌 설정에 집중할 수 있게 함으로써 이러한 문제를 해결합니다.
핵심 아키텍처 및 구성 요소 ⚙️
Kafka Connect는 카프카 브로커와는 별개의 프로세스(또는 프로세스 클러스터)로 실행됩니다. 명확한 계층 구조를 가집니다.
- 워커(Worker): Kafka Connect 프레임워크를 실행하는 JVM 프로세스입니다. 커넥터와 그 태스크들을 실행하는 책임을 집니다.
- 커넥터(Connector): 작업을 정의하는 상위 수준의 설정입니다. 특정 외부 시스템과 통신하는 방법을 아는 "플러그인"입니다. 커넥터 자체는 데이터를 옮기지 않으며, 태스크(Task) 를 생성하고 관리하는 관리자 역할을 합니다.
- 태스크(Task): "일벌"입니다. 태스크는 실제로 데이터 전송을 수행하는 경량 스레드입니다. 단일 커넥터는 병렬 작업을 위해 여러 태스크를 생성하도록 설정될 수 있으며, 이것이 Kafka Connect 확장성의 핵심입니다.
주로 설정하는 구성 요소
데이터 파이프라인을 설정할 때 주로 다음 세 가지 구성 요소를 다루게 됩니다.
1. 커넥터 (Connectors)
커넥터는 외부 데이터 저장소와 인터페이스하는 플러그인입니다. 두 가지 유형이 있습니다.
- 소스 커넥터(Source Connector): 외부 시스템(소스)에서 데이터를 가져와 카프카 토픽으로 발행합니다.
- 예시:
JdbcSourceConnector는 데이터베이스 테이블에서 새로운 행을 감시하고 각 행을 카프카 토픽에 메시지로 씁니다.
- 예시:
- 싱크 커넥터(Sink Connector): 카프카 토픽에서 데이터를 읽어 외부 시스템(싱크)에 씁니다.
- 예시:
ElasticsearchSinkConnector는 카프카 토픽을 구독하고 각 메시지를 Elasticsearch 인덱스에 도큐먼트로 씁니다.
- 예시:
Confluent Hub와 같은 플랫폼에는 거의 모든 인기 있는 데이터 시스템(데이터베이스, 클라우드 스토리지, SaaS API 등)을 위한 방대한 양의 사전 구축된 커넥터가 있습니다.
2. 컨버터 (Converters)
카프카 토픽은 원시 바이트만 저장합니다. 컨버터는 데이터가 Kafka Connect를 드나들 때 데이터의 직렬화 및 역직렬화를 처리하는 중요한 구성 요소입니다. 데이터 형식을 정의합니다.
JsonConverter: 데이터를 간단한 JSON 형식으로 변환합니다. 가독성에는 좋지만 스키마가 없습니다.AvroConverter: Avro 바이너리 형식을 사용하여 데이터를 변환하고 스키마 레지스트리와 통합됩니다. 효율적이고 데이터 일관성을 보장하므로 운영 환경에서 권장되는 접근 방식입니다.
3. 트랜스폼 (Single Message Transforms - SMTs)
트랜스폼은 별도의 스트림 처리 애플리케이션 없이 파이프라인을 통과하는 메시지에 대해 간단한 인라인 수정을 수행할 수 있는 매우 유용한 기능입니다.
- 비유: 커넥터가 전원 어댑터라면, 트랜스폼은 거기에 부착하는 작은 플러그 변환기입니다.
- 예시:
- 메시지의 필드 이름 변경 또는 삭제.
- 정적 필드 추가 (예: 데이터 소스 식별자).
- 필드의 데이터 타입 변경.
운영 모드
Kafka Connect는 두 가지 모드로 실행할 수 있습니다.
단독 모드 (Standalone Mode)
- 정의: 단일 워커 프로세스가 모든 커넥터와 태스크를 실행합니다.
- 설정: 로컬 머신의 단일 속성 파일을 통해 관리됩니다.
- 사용 사례: 개발, 테스트 또는 간단하고 중요하지 않은 작업에 이상적입니다.
- 단점: 단일 장애 지점(single point of failure) 입니다. 워커 프로세스가 죽으면 모든 데이터 파이프라인이 중지됩니다.
분산 모드 (Distributed Mode) 🚀
- 정의: 여러 워커가 다른 머신에서 실행되어 클러스터를 형성합니다.
- 설정: REST API를 통해 관리됩니다. 커넥터 설정을 API에 제출하면 클러스터가 나머지를 처리합니다.
- 주요 특징:
- 확장성: 태스크가 클러스터의 모든 가용 워커에 걸쳐 자동으로 균형을 이룹니다.
- 내결함성: 워커 노드가 실패하면 클러스터는 해당 커넥터와 태스크를 다른 정상 워커에 자동으로 재할당합니다.
- 사용 사례: 이것이 모든 운영 환경의 표준입니다.
실제 예시: JDBC 소스 커넥터
MySQL 데이터베이스의 users 테이블에서 모든 신규 사용자를 카프카 토픽으로 스트리밍하고 싶다고 가정해 봅시다.
- Kafka Connect를 분산 모드로 실행합니다.
- 새로운
JdbcSourceConnector를 정의하는 JSON 설정을 REST API에 제출합니다. - 이 설정에는 다음이 포함됩니다.
- 커넥터 클래스 (
io.confluent.connect.jdbc.JdbcSourceConnector). - 데이터베이스 연결 정보 (URL, 사용자, 암호).
- 모니터링할 테이블 (
table.whitelist: "users"). - 새로운 행을 감지하는 모드 (예:
mode: "timestamp",updated_at열을 사용하여 신규 또는 업데이트된 행 찾기). - 데이터를 쓸 카프카 토픽 (
topic.prefix).
- 커넥터 클래스 (
- 다음 과정: Kafka Connect 클러스터는 이 커넥터를 위한 태스크(또는 여러 태스크)를 생성합니다. 이 태스크는 주기적으로
users테이블을 쿼리하고(SELECT * FROM users WHERE updated_at > ?), 각 새로운 행을 설정된 형식(예: Avro)으로 변환한 후, 처리한 마지막updated_at타임스탬프를 자동으로 관리하며users카프카 토픽에 발행합니다.
references